package cn.doitedu.pojo;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

/**
 * 将Kafka中的数据是实时同步到StarRocks
 * <p>
 * 使用FlinkSQL的方式将数据同步
 */
public class KafkaEvent2StarRocks {

    public static void main(String[] args) throws Exception {


        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());

        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

        //自定义Kafka的消费反序列化器，同时生成唯一ID

        //创建一个Source表（以后从哪里读取数据，从Kafka读取数据）
        //测试数据如下
        //
        tEnv.executeSql(
                "CREATE TABLE kafka_event (        \n" +
                        "  `account` VARCHAR,        \n" +
                        "  `appId` VARCHAR,          \n" +
                        "  `appVersion` VARCHAR,     \n" +
                        "  `carrier` VARCHAR,        \n" +
                        "  `deviceId` VARCHAR,       \n" +
                        "  `eventId` VARCHAR,        \n" +
                        "  `ip` VARCHAR,             \n" +
                        "  `latitude` DOUBLE,        \n" +
                        "  `longitude` DOUBLE,       \n" +
                        "  `netType` VARCHAR,        \n" +
                        "  `osName` VARCHAR,         \n" +
                        "  `osVersion` VARCHAR,      \n" +
                        "  `releaseChannel` VARCHAR, \n" +
                        "  `resolution` VARCHAR,     \n" +
                        "  `sessionId` VARCHAR,      \n" +
                        "  `timeStamp` BIGINT,       \n" +
                        "  `properties` VARCHAR,     \n" +
                        "  `topic` VARCHAR METADATA VIRTUAL, \n" +
                        "  `partition` BIGINT METADATA VIRTUAL,  \n" +
                        "  `offset` BIGINT METADATA VIRTUAL      \n" +
                        ") WITH (\n" +
                        "  'connector' = 'kafka',\n" +
                        "  'topic' = 'eagle-applog',\n" +
                        "  'properties.bootstrap.servers' = 'node-1.51doit.cn:9092,node-2.51doit.cn:9092,node-3.51doit.cn:9092',\n" +
                        "  'properties.group.id' = 'g010',\n" +
                        "  'scan.startup.mode' = 'earliest-offset',\n" +
                        "  'format' = 'json',\n" +
                        "  'json.ignore-parse-errors' = 'true'\n" +
                        ")"
        );


        tEnv.executeSql(
                "CREATE TABLE result_sink(" +
                        "  `deviceId` VARCHAR,       \n" +
                        "  `timeStamp` BIGINT,       \n" +
                        "  `id` VARCHAR,             \n" +
                        "  `account` VARCHAR,        \n" +
                        "  `appId` VARCHAR,          \n" +
                        "  `appVersion` VARCHAR,     \n" +
                        "  `carrier` VARCHAR,        \n" +
                        "  `eventId` VARCHAR,        \n" +
                        "  `ip` VARCHAR,             \n" +
                        "  `latitude` DOUBLE,        \n" +
                        "  `longitude` DOUBLE,       \n" +
                        "  `netType` VARCHAR,        \n" +
                        "  `osName` VARCHAR,         \n" +
                        "  `osVersion` VARCHAR,      \n" +
                        "  `releaseChannel` VARCHAR, \n" +
                        "  `resolution` VARCHAR,     \n" +
                        "  `sessionId` VARCHAR,      \n" +
                        "  `properties` VARCHAR \n" +
                        ") WITH ( " +
                        "  'connector' = 'starrocks'," +
                        "  'jdbc-url'='jdbc:mysql://node-1.51doit.cn:9030?characterEncoding=utf-8'," +
                        "  'load-url'='node-1.51doit.cn:8030'," +
                        "  'database-name' = 'test'," +
                        "  'table-name' = 'eagle_detail'," +
                        "  'username' = 'root'," +
                        "  'password' = ''," +
                        "  'sink.buffer-flush.max-rows' = '1000000'," +
                        "  'sink.buffer-flush.max-bytes' = '300000000'," +
                        "  'sink.buffer-flush.interval-ms' = '30000'," +
                        "  'sink.properties.column_separator' = '\\x01'," +
                        "  'sink.properties.row_delimiter' = '\\x02'," +
                        "  'sink.max-retries' = '3'" +
                        ")"
        );

        //从视图中取数据插入到Sink表中
        TableResult tableResult = tEnv.executeSql("INSERT INTO result_sink " +
                "SELECT deviceId,  `timeStamp`, concat_ws('_', `topic`, CAST(`partition` AS VARCHAR), CAST(`offset` AS VARCHAR)) AS id," +
                " account, appId, appVersion, carrier, eventId, ip, latitude, longitude, netType, osName, " +
                "osVersion, releaseChannel, resolution, sessionId, properties" +
                " FROM kafka_event");

        tableResult.print();

        env.execute();


    }


}
